package demo.components.kafka.demo;

import demo.components.kafka.KafkaConsumer;
import demo.components.kafka.KafkaMessageHandler;

public class KafkaConsumerTest {


    public static void main(String[] args) {
        String groupId="nlb_ddb";
        String zks = "10.180.156.21:2181";
        String topic = "napm.comb.nonagent";

        KafkaConsumer kafkaConsumer = new KafkaConsumer(groupId, zks);

        new Thread(new CollectorTask(new Handler(),kafkaConsumer,topic)).start();
    }

    static class CollectorTask implements Runnable {
        private KafkaMessageHandler handler;
        private KafkaConsumer kafkaConsumer;
        private String topic;

        public CollectorTask(KafkaMessageHandler handler, KafkaConsumer kafkaConsumer, String topic) {
            this.handler = handler;
            this.kafkaConsumer = kafkaConsumer;
            this.topic = topic;
        }

        @Override
        public void run() {
            try {
                kafkaConsumer.consume(topic, handler);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

}
